Skip to content

(按“消息生命周期”顺序给方案,每环都配一句口诀)

阶段丢失场景防护手段口诀
① 生产端Netty 写缓存成功即返回,实际没进 Kafka异步转同步:KafkaProducer 回调 send(record, callback);失败落本地磁盘补偿重试“回调不到不返回,失败写盘慢慢补”
② 消息队列Kafka broker 挂掉,pageCache 未刷盘acks=all + min.insync.replicas=2 + 每条 flush.ms=10 强制刷盘“多副本+强制刷,机器掉电也不怕”
③ 消费端拉取后处理完 先 commit 后业务,宕机时 commit 了但业务没写 Redis手动异步提交:业务成功写 Redis 后再 consumer.commitAsync();失败走重试队列“业务成功再打卡,失败回滚不丢话”

实现代码(Netty → Kafka → Redis 全链路)

java
// 1. 生产端:回调成功才回客户端
producer.send(new ProducerRecord<>("danmu", roomId, msg), (meta, ex) -> {
    if (ex == null) {
        ctx.writeAndFlush(new Ack(danmuId));      // 只在这返回 ack
    } else {
        diskCompensate.write(msg);                // 失败写本地文件
    }
});

// 2. Kafka 配置:零丢失最强组合
props.put("acks", "all");
props.put("min.insync.replicas", "2");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", "true");   // 幂等生产

// 3. 消费端:业务写完 Redis 再提交 offset
List<ConsumerRecord<String,Danmu>> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,Danmu> r : records) {
    try{
        redisOps.leftPush("danmu:"+r.key(), r.value());   // 写 Redis
        redisOps.expire(...);
        consumer.commitAsync(Collections.singletonMap(
            new TopicPartition(r.topic(), r.partition()),
            new OffsetAndMetadata(r.offset() + 1)), null);
    }catch(Exception e){
        // 抛到重试队列,offset 不提交即可
    }
}

额外兜底

  • 本地补偿任务:扫描 diskCompensate 目录,每 30 s 重新发送到 Kafka

  • 双队列:主队列 danmu + 重试队列 danmu.retry,重试 3 次后入死信队列人工处理